package com.sea.bei


import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
/**
  * Date  2021/11/5-12:18
  * 一：描述: 
  *   测试Flink SQL 的方式运行 FlinkCDC
  *
  * 二：注意点: 
  *
  *  1 使用SQL的方式运行FlinkCDC 需要flink的版本为 1.13.x
  *  2 经测试 'scan.startup.mode' = 'initial' 的方式只能读取表中现有的数据，
  *       但是数据发生变化时不能监控到（不知为什么）, 使用Java来写与scala也一样。
  *  3 经测试 'scan.startup.mode' = 'latest-offset' 的方式使用正常
  *
  */
object FlinkCDCScalaSQL {
  
  def main(args: Array[String]): Unit = {
    
    // 1 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
  
    tableEnv.executeSql("create table user_info (" +
      "id string primary key," +
      "name STRING," +
      "sex STRING" +
      ")with(" +
      "'connector' = 'mysql-cdc'," +
      "'hostname' = 'localhost'," +
      "'port' = '3306'," +
      "'username' = 'root'," +
      "'password' = '123456'," +
      "'database-name' = 'flinkcdc'," +
      "'scan.startup.mode' = 'initial'," +
      "'table-name' = 'user_info'" +
      ")")
  
    
    val table: Table = tableEnv.sqlQuery("select * from user_info")
    implicit val typeInfo = TypeInformation.of(classOf[Row])
    tableEnv.toRetractStream[Row](table).print()
    
    env.execute()
    
    
  }
  
}
